/*
    Copyright 2006-2012 Patrik Jonsson, sunrise@familjenjonsson.org

    This file is part of Sunrise.

    Sunrise is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    Sunrise is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with Sunrise.  If not, see <http://www.gnu.org/licenses/>.

*/

/** \file

    Implementation of the xfer class methods. */

#ifndef __xfer_impl__
#define __xfer_impl__

#include "xfer.h"
#include "config.h"
#include <cstdio>
#include <algorithm> 
#include <memory>
#include "hpm.h"
#include <fstream> 
#include "boost/lexical_cast.hpp"
#include "boost/lambda/lambda.hpp"
#include "boost/lambda/bind.hpp"
#include "constants.h"
#include "emission.h"
#include "threadlocal.h"
#include <tbb/tick_count.h>
#include <boost/variant.hpp>
#include "almost_equal.h"


/** Constructor. Sets up the transfer class to handle a specified grid
    and send the output to a specified emergence object. It also
    creates the local and remote queues. */
template <typename dust_model_type, typename grid_type>
mcrx::xfer<dust_model_type, grid_type>::
xfer(T_grid& grid, const T_emission& emi, T_emergence& eme, 
     const T_dust_model& dm, T_rng_policy& rng, const T_biaser& b,
     T_float i_min, T_float i_max,
     bool add_intensities,
     bool immediate_reemission, int minscat, int maxscat) :
  g (grid), current_cell (), b_(b),
  ray_min_i_ (i_min), ray_max_i_(i_max), 
  emergence_ (eme), dust_model(dm), rng_(rng),
  local_pending_(new T_ray_queue()),
  px_queue_(new T_pixel_queue()),
  emission_ (emi), 
  dn_(make_thread_local_copy(dust_model.zero_density())),
  add_intensities_(add_intensities),
  immediate_reemission_(immediate_reemission),
  n_scat_min_(minscat), n_scat_max_(maxscat),
  thread_num_(-1), mpi_master_(0),
  integrations_per_cell_(2)
  // we do not need to initialize the ray here because it has a fully
  // compliant assignment operator that resizes everything as
  // necessary
{
  current_emergence_ = &emergence_;

  consolidate_arrays(i_in_);
}


/**  Copy constructor.  Makes a copy of an xfer object, including the
     current cell and random number generator state. The shared_ptrs
     to the queues are copied so that all xfer instances that are
     copies of the same original one share these queues.  */
template <typename dust_model_type, typename grid_type>
mcrx::xfer<dust_model_type, grid_type>::
xfer(const xfer& rhs):
  g (rhs.g), current_cell (rhs.current_cell), 
  ray_min_i_ (rhs.ray_min_i_), b_(rhs.b_),
  local_pending_(rhs.local_pending_),
  px_queue_(rhs.px_queue_),
  rng_(rhs.rng_),
  ray_ (rhs.ray_),
  ray_max_i_(rhs.ray_max_i_), 
  emergence_ (rhs.emergence_), dust_model (rhs.dust_model),
  emission_ (rhs.emission_), 
  dn_(make_thread_local_copy(dust_model.zero_density())),
  add_intensities_(rhs.add_intensities_),
  immediate_reemission_(rhs.immediate_reemission_),
  n_scat_min_(rhs.n_scat_min_), n_scat_max_(rhs.n_scat_max_),
  thread_num_(-1), mpi_master_(rhs.mpi_master_),
  integrations_per_cell_(rhs.integrations_per_cell_)
{
  current_emergence_ = &emergence_;

  consolidate_arrays(i_in_);
}

/** Destructor adds the data in the local emergence to the main
    emergence, if applicable. */
template <typename dust_model_type, typename grid_type>
mcrx::xfer<dust_model_type, grid_type>::~xfer ()
{
}

template <typename dust_model_type, typename grid_type>
template <typename TT>
void mcrx::xfer<dust_model_type, grid_type>::
consolidate_arrays(blitz::Array<TT, 1>&)
{
  const int narr=10;
  // Pad the allocated memory so different threads don't share cache
  // lines.
  const int buffer=2; 
  // allocate the space and set this block to not use locking
  array_2 temp(narr+buffer*2, emission().zero_lambda().size());
  threadLocal_warn(temp);
  int i=buffer;
  // Because temp is local to this function, we can't return *weak*
  // references.
  norm_.reference(temp(i++,blitz::Range::all()));
  i_in_.reference(temp(i++,blitz::Range::all()));
  i_out_.reference(temp(i++,blitz::Range::all()));
  dtau_.reference(temp(i++,blitz::Range::all()));
  tau_exit_.reference(temp(i++,blitz::Range::all()));
  onemexptau_exit_.reference(temp(i++,blitz::Range::all()));
  scattering_tau_.reference(temp(i++,blitz::Range::all()));
  original_intensity_.reference(temp(i++,blitz::Range::all()));
  intensity_to_camera_.reference(temp(i++,blitz::Range::all()));
  // The ray is also thread-local, so we set it to refer to this too
  ray_.set_intensity_ref(temp(i++,blitz::Range::all()));
  assert(i==narr+buffer);

  T_densities tmpdensity(make_thread_local_copy(dust_model.zero_density()));
  ray_.set_column_ref(tmpdensity);
}


/** This man worker function. This loops forever (until it finds an
    invalid action on the queue), either popping rays off the queue
    and processing them, or if the queue is empty and we have rays
    left, producing a ray. If integration is true, we're doing ray
    intensity integrations and we pull a pixel position off of the
    px_queue_ instead of calling emit.  */
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::
worker_loop (tbb::atomic<long>& n_rays, long n_rays_desired, 
	     boost::function<bool (xfer*,tbb::atomic<long>&,long)> creation_func)
{
  using namespace std;
  DEBUG(1,printf("Task %d-%d worker thread starting\n",task(), thread()););
  tbb::tick_count start=tbb::tick_count::now();
  hpm::thread_start(Worker);

  T_queue_item next_item;

  while(true) {
    /*
    const tbb::tick_count now=tbb::tick_count::now();
    if((now-start).seconds()>180) {
      hpm::dump_data(string("Worker thread ")+boost::lexical_cast<string>(task())+"-"+boost::lexical_cast<string>(thread()));
      start=now;
    }
    */

    const int n= local_queue()->size();
    bool popped;
    // if we are done with the local rays, we switch from nonblocking
    // to blocking pop so that the master can see how many threads are
    // idle.
    if((n_rays==n_rays_desired) && pixel_queue()->empty()) {
      popped=true;
      local_queue()->pop(next_item);
    }
    else {
      popped = local_queue()->try_pop(next_item);
    }

    if(popped) {
      // we got a ray off the queue. Figure out how to deal with it
      //cout << "popped tracker "; next_item.cell_.print(cout) << endl;

      if(next_item.action_==T_queue_item::invalid) {
	// if we find an item with an "invalid" action, return;
	DEBUG (1, printf("Worker %d-%d popped invalid action, exiting\n", task(),thread()););
	return;
      }

      // Restore the ray, normalization and cell tracker. These
      // assignment operators copy the elements, so we keep the
      // ray/norm pointing to the local data for cache coherency.
      ray_ = next_item.ray_;
      g.initialize_tracker(next_item.cell_, ray_.position(), current_cell);
      norm_ = next_item.norm_;

      // Run the ray according to the action
      switch (next_item.action_) {

      case T_queue_item::scatter:
	// For scatter rays, the parameter is the scattering
	// species. Scatter the ray first and then fall through to the mainloop:
	DEBUG(1,printf("Worker %d-%d popped scatter ray for species %d, queue depth %d\n",task(), thread(), next_item.param_, n); cout.flush(););
	scatter(next_item.param_);
	break;
      
      case T_queue_item::propagate:
	// For propagating rays, we immediately enter the mainloop.
	DEBUG(1,printf("Worker %d-%d popped propagate ray, queue depth %d\n",task(), thread(),n); cout.flush(););
	break;

      case T_queue_item::emerge:
	// For emerging rays, the parameter is the camera number. just
	// run camera ray and we are done
	DEBUG(1,printf("Worker %d-%d popped camera ray for camera %d, queue depth %d\n",task(), thread(), next_item.param_, n); cout.flush(););
	i_in_=1; intensity_to_camera_=1;
	camera_ray(next_item.param_);
	continue;

      case T_queue_item::integrate:
	// For Integration rays parameter is the camera number.
	DEBUG(1,printf("Worker %d-%d popped integration ray for camera %d, queue depth %d\n",task(), thread(), next_item.param_, n); cout.flush(););
	integrate_ray(next_item.param_, false);
	continue;

      case T_queue_item::emission:
	// For Integration rays parameter is the camera number.
	DEBUG(1,printf("Worker %d-%d popped column integration ray for camera %d, queue depth %d\n",task(), thread(), next_item.param_, n); cout.flush(););
	integrate_ray(next_item.param_, true);
	continue;

      default:
	cerr << "Unknown action in worker loop!" << endl;
	throw 1;
      };
    }
    else {
      // we did not get ray off the local queue. If sending is
      // enabled, we are allowed to create one. (We can't create one
      // if sending is disabled because we might need to send it and
      // then we would deadlock.)
      
      if(!mpi_master().send_enabled())
	continue;

      // call the creation function to emit/create an integration ray
      if(!creation_func(this, n_rays, n_rays_desired))
	continue;
      }

    ray_mainloop();
    assert(!current_cell);

    if(current_cell.task() >=0) {
      // The ray left the domain. Push it to the other task
      if(current_cell.task()==task()) {
	cerr << "Error: current_cell inconsistency on task " << task()<< ": ";
	current_cell.print(cerr) << endl;
      }
      assert(current_cell.task()!=task());
      push_ray(T_queue_item::propagate, ray_, current_cell, norm_);
    }

    // coming out here, the ray is done. we try to pop another.
  }
}

template <typename dust_model_type, typename grid_type>
bool
mcrx::xfer<dust_model_type, grid_type>::
create_work_shooting (tbb::atomic<long>& n_rays, long n_rays_desired)
{
  // see if we have rays left to emit
  bool add_ray = increment_if_less_than(n_rays, n_rays_desired);
  if(!add_ray) {
    // no rays left to be produced. go back to trying the queue
    return false;
  }
  
  // we should produce a ray. Call emit and then enter mainloop
  DEBUG(1,printf("Empty queue, Worker %d-%d producing ray, nrays %ld\n",task(), thread(), long(n_rays)); cout.flush(););
  emit(false, n_rays_desired);
  return true;
};

template <typename dust_model_type, typename grid_type>
bool
mcrx::xfer<dust_model_type, grid_type>::
create_work_intensity_integration (tbb::atomic<long>& n_rays, 
				   long n_rays_desired)
{
  // see if we have pixel positions left
  T_pixel_queue_item pxi;
  bool popped = px_queue_->try_pop(pxi);
  if(!popped)
    // no pixels left. go back to trying the queue
    return false;
  
  // we got a pixel. integrate it. 
  DEBUG(1,printf("Empty queue, Worker %d-%d producing integration ray for pixel (%d, %f, %f)\n",this->task(), thread(), pxi.first, pxi.second[0], pxi.second[1]); cout.flush(););
  integrate_ray(pxi.second, pxi.first, false);
  current_cell.reset();
  // But we still want to return *false*, because we're done with this
  // ray, it's already been pushed
  return false;
};

template <typename dust_model_type, typename grid_type>
bool
mcrx::xfer<dust_model_type, grid_type>::
create_work_emission_integration (tbb::atomic<long>& n_rays, 
				  long n_rays_desired)
{
  // see if we have pixel positions left
  T_pixel_queue_item pxi;
  bool popped = px_queue_->try_pop(pxi);
  if(!popped)
    // no pixels left. go back to trying the queue
    return false;
  
  // we got a pixel. integrate it.
  DEBUG(1,printf("Empty queue, Worker %d-%d producing emission integration ray for pixel (%d, %f, %f)\n",task(), thread(), pxi.first, pxi.second[0], pxi.second[1]); cout.flush(););
  integrate_ray(pxi.second, pxi.first, true);
  current_cell.reset();
  // But we still want to return *false*, because we're done with this
  // ray, it's already been pushed
  return false;
};


/** Generates a ray and calls the ray main loop until the queue is empty. */
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::shoot ()
{
  hpm::enter_stage(Worker);
  // Generate a ray
  DEBUG(2,cout << "\nShooting new ray" << endl;);
  emit(false);
 
  while(true) {
    ray_mainloop();

    // We now try to pop a ray off the queue in case there are split
    // rays to process. Popping them sets us up for a new mainloop
    int r;
    const T_action a = pop_ray (r);
    assert(a!=T_queue_item::emerge);
    if(a == T_queue_item::invalid)
      // nothing on queue
      break;

    if(a == T_queue_item::scatter)
      scatter(r);
    // if a==propagate, we just reenter mainloop.
  }

}


/** The main loop for processing a ray. It propagates the ray until
    scattering and then repeats until the ray exits or is killed. */
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::ray_mainloop ()
{
  hpm::scoped_stage current(Mainloop);
  while(current_cell) {

    DEBUG(2,cout << "ray max I/max L/max bias: " << max(ray_.intensity()) << '\t' << max(ray_.intensity()*norm_) << '\t' << max(b_.bias_factor(ray_.intensity())) << endl;);
    DEBUG(1,if(!condition_all(ray_.intensity()< ray_max_i_)) \
	      cout << "WARNING: Max ray intensity " << max(ray_.intensity())<< " at lambda " << maxIndex(ray_.intensity()) << " reference " << b_ << " refintensity " << b_.reference1(ray_.intensity()) << endl;);

    // we ALWAYS force scatter
    process_ray_forced();

    // coming out here, we have propagated the ray to either the
    // scattering point or until it left the grid

    // if current_cell is not set, the ray is done
    if (!current_cell) {
      DEBUG(2,cout << "Ray left volume" << endl;);
      break;
    }

    // otherwise we scatter the ray
    scatter();

    // if current_cell is still set (it can be set to zero by scatter
    // if the ray loses russian roulette) check if it should be split
    if(current_cell)
      check_ray_split(T_queue_item::propagate);
  }

  // Coming out here, the ray is either below our intensity cutoff, or
  // it left the grid.  In any case, we are done with this ray.
}


/** Force scatters a ray once. */
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::process_ray_forced ()
{
  DEBUG(2,cout << "Ray will be force-scattered\n";);
  ASSERT_ALL(ray_.traversed_column()==0);
  hpm::scoped_stage current(Forced);

  // determine the depth of the scattering stratum, using dtau_ and
  // tau_exit_ as temporaries

  scattering_reftau_ = b_.determine_stratum(ray_.intensity(), ray_max_i_,
					    dust_model, immediate_reemission_,
					    dtau_, tau_exit_);

  // Initialize forced scattering variables. Note that the ray can
  // already have nonzero l at this point if it originated outside
  // the grid. We must take into account that length does NOT
  // refer to length from origin when we use it.
  const vec3d ray_origin = ray_.position();
  traversed.clear();
  traversed.push_back(T_propagation_history (dust_model.zero_density(),
					     ray_.length(), T_cell_tracker()) );
      
  // 1.  Propagate ray to grid boundary without scattering,
  // accumulating optical depth, saving traversed and not adding
  // intensity directly

  // run propagate with do_scatter=true so we stop at our selected tau_e
  hpm::push_stage(Propagate);
  while( (!propagate<true, true, false, false>()) && current_cell);
  hpm::pop_stage();

  // 1.1 Calculate tau_exit. After this we don't need the
  // traversed_column anymore, so we reset it.
  dust_model.total_abs_length(ray_.traversed_column(), tau_exit_);
  onemexptau_exit_ = blitz::quiet_NaN(T_float());
  ray_.reset_traversed_column();
  
  const T_float onemexpreftau_exit = -expm1 (-b_.reference1(tau_exit_));

  DEBUG(2,cout << "Actual tau_exit " << b_.reference1(tau_exit_) << endl;);

  // From this point we continue just like this was normal forced
  // scattering, but first we push the existing ray, which is ready to
  // enter the next stratum. (Note that the stratum can end if we hit
  // tau_exit but also if propagate ended because we moved to another
  // task.) 
  if(current_cell.task()>=0) {
    // we did not exit the medium, which means there IS a next
    // stratum. We push that as a propagate ray
    DEBUG(2,cout << "Testing ray entering next stratum:\n\t";);
    original_intensity_ = ray_.intensity();
    ray_.bias(exp(-tau_exit_));
    // if the ray is too low intensity, we play RR before pushing it
    if(russian_roulette()) {
      DEBUG(2,cout << "\t";);
      push_ray(T_queue_item::propagate, ray_, current_cell, norm_);
    }
    ray_.set_intensity(original_intensity_);
  }
  else {
    DEBUG(2, cout << "Ray exited volume in stratum. No more strata\n";);
  }
				    
  // 1.5 Add intensities to cells. We can do this before drawing a
  // scattering point because we know that the intensities will
  // decline as exp(-tau) and we'll probably get less noise by
  // doing this analytically. It does however mean a lot of exp-ing.

  // use two temporary arrays for storage and swap them back and forth
  i_in_ = ray_.intensity();

  if(add_intensities_) {
    hpm::scoped_stage current(Intensities);
    for (typename std::vector<T_propagation_history >::const_iterator 
	   i = traversed.begin(), j = i++; i != traversed.end(); ++i, ++j) {
      assert (i != traversed.begin());
      assert (i- j == 1);
      const T_float dl = i->length() - j->length();

      // optical depth through this cell
      dn_ = i->n() - j->n();
      dust_model.total_abs_length(dn_, dtau_);

      assert(i_out_.data()!=i_in_.data());

      // The appropriate intensity to add is the length-averaged
      // mean intensity through the grid cell, which is the
      // expression below (see notes 080909). As dtau->0, we will
      // suffer cancellation and possibly division by zero, so we
      // use expansion for very small dtau.

      // (original_intensity is just used as a temporary here, and
      // loops are hand-coded for efficiency on these short
      // arrays)

      /*
	i_out_ = i_in_*exp(-dtau_);
	original_intensity_ = dl*norm_*where(dtau_<1e-6,
	0.5*(i_in_+i_out_),
	(i_in_-i_out_)/dtau_);
      */

      T_float* restrict dptr = original_intensity_.dataFirst();
      const T_float* restrict dtauptr = dtau_.dataFirst();
      const T_float* restrict normptr = norm_.dataFirst();
      const T_float* restrict iinptr = i_in_.dataFirst();
      T_float* restrict ioutptr = i_out_.dataFirst();
      const int nlambda = i_in_.extent(blitz::firstDim);
#pragma ivdep
      for(int l=0; l<nlambda; ++l) {
	ioutptr[l] = iinptr[l]*exp(-dtauptr[l]);

	dptr[l] = dl*normptr[l]* 
	  ((dtauptr[l]<1e-6) ? 
	   0.5*(iinptr[l]+ioutptr[l]) :
	   (iinptr[l] - ioutptr[l])/dtauptr[l]);
      }

      ASSERT_ALL(original_intensity_<1e60);
      i->cell().data()->get_absorber().add_intensity (original_intensity_);

      std::swap(i_in_, i_out_);
    }
  }
        
  // 2.  Draw an interaction optical depth within the distribution
  // If the medium is empty, there is a degeneracy. NO part of the
  // ray will scatter and we need to deal with this separately below.

  // we must take care of very small tau_exit values separately to
  // avoid getting poor precision when doing the log(1+x). We use an
  // approximation for very small x, as 0 is not acceptable. (see
  // bug 33).
  const T_float x=-rng_.rnd()*onemexpreftau_exit;
  scattering_reftau_ = x<-1e-5 ? 
    -log(1+x) : -x*(6.+x)/(6.+4.*x);

  DEBUG(2, cout << "Drew tau_s =" << scattering_reftau_ << endl;);
	
  // 3.1 We need to find out the full scattering_tau vector at all
  // wavelengths.  This is necessary to do before we start adding
  // intensities to the cells, because the ray must be
  // biased. Search the propagation history to find the scattering
  // cell. The iterator will point to the element containing the
  // cell which will scatter, and the n and l at the END of that
  // cell.
  typename std::vector<T_propagation_history >::const_iterator 
    scattering_i = 
    upper_bound(traversed.begin(), traversed.end(), 
		scattering_reftau_,
		boost::lambda::_1 < 
		boost::lambda::bind(&T_dust_model::total_reference_abs_length,
				    boost::cref(dust_model),
				    boost::lambda::bind(&T_propagation_history::n, 
							boost::lambda::_2),
				    boost::cref(b_)));
  assert((scattering_reftau_ > 0) || (scattering_i==traversed.end()));
  const T_cell_tracker& scattering_c(scattering_i->cell());

  // scattering_i will be traversed.end() if the medium is empty!
  // (but not if the medium is *practically* empty)
  if (scattering_i == traversed.end()) {
    // the medium is empty, end the ray
    DEBUG(2, cout << "Forced scattering in empty medium, no scattering." << endl;);
    current_cell.reset();	
    return;
  }

  const T_float endreftau = 
    dust_model.total_reference_abs_length(scattering_i->n(), b_);
  const T_float begreftau = 
    dust_model.total_reference_abs_length((scattering_i-1)->n(), b_);
  assert(begreftau <= scattering_reftau_);
  assert(endreftau >  scattering_reftau_);

  // Fractional optical depth traversed at the scattering point,
  // which is the same as the fractional column densities
  // traversed
  const T_float fn = 
    (scattering_reftau_ - begreftau)/(endreftau - begreftau);

  // The traversed column densities of the ray at the scattering point
  const T_densities scattering_n 
    ( (scattering_i-1)->n() + 
      (scattering_i->n()-(scattering_i-1)->n()) * fn );

  // Find fractional distance through the cell at the scattering
  // point 
  const T_float dl = scattering_i->length() - (scattering_i-1)->length();
  // (remember that to get to the scattering point we need to
  // subtract the length the ray had gone when it hit the grid)
  const T_float scell_l = 
    (scattering_i-1)->length()-traversed.begin()->length();
  const T_float f = 
    scattering_c.column_to_distance_fraction
    (fn, ray_origin + scell_l*ray_.direction(), ray_.direction(), dl);

  // The path length of the ray from entering the grid to scattering point
  const T_float scattering_l = scell_l + f*dl;

  // The full scattering optical depth vector
  dust_model.total_abs_length(scattering_n, scattering_tau_);

  // 3.2 Calculate bias factor due to optical depth of scattering
  // based on the forced probabilities. 
  DEBUG(10,T_lambda bias(b_.bias_factor(scattering_tau_*exp(-scattering_tau_)/(onemexptau_exit_))); \
	if(!condition_all(bias<ray_max_i_)) \
	  cout << "WARNING: Large bias factor  " << max(bias)<< " at lambda " << maxIndex(bias) << " biaser " << b_ << " scattering reftau " << scattering_reftau_ << " reftau_exit " << b_.reference1(tau_exit_) << " at "<<__FILE__ <<':'<<__LINE__ <<endl;);

  // Calculate the bias factor, ignoring the tau_exit part which
  // cancels. Instead, we just apply the reference part to the
  // intensity below. There is no issue with 0/0 here because
  // scattering_reftau_ >0.
  ray_.bias(b_.bias_factor(scattering_tau_*exp(-scattering_tau_)));

  // Check for degenerate case where the ENTIRE ray exited and it
  // doesn't make sense to deal with the forced scattering ray and
  // we bail out and the ray is done.  this was already checked
  // for above so should never happe
  assert(scattering_l>0);

  // 4. update the ray to the interaction point, and scatter it
  ray_.set_position(ray_origin);
  // set ray intensity to part that will scatter. This partially
  // undoes the bias factor above... we should just combine it.
  ray_.bias(onemexpreftau_exit);
  ray_.propagate(scattering_l, scattering_n);
  current_cell = scattering_c;
  assert (current_cell);

  // This assertion is not quite safe because ray_origin can be on
  // the grid boundary in which case locate can give zero.  In
  // cases of suddenly very high optical depth in cells, where the
  // ray will scatter very close to the cell face, numerical
  // effects can also make the ray be in the immediately preceding
  // cell. This is more likely to happen if the emission point is
  // far away, as the cells will be small dl's on top of a large
  // value just to make it to the grid. can we reformulate the
  // math here to be less sensitive to this?
  current_cell.assert_contains(ray_.position());

  // should we do russian roulette here? because of the domain walls
  // we do many strata and they end up being very weak.
  russian_roulette();

  // This completes this ray segment. Scattering will happen in the
  // mainloop since current_cell is valid.
}

/** Propagates a ray through one unforced scattering. */
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::process_ray_unforced()
{
  hpm::scoped_stage current(Unforced);
  ASSERT_ALL(ray_.traversed_column()==0);

  // The ray is not forced to scatter, we just need to draw
  // scattering tau from the un-forced distribution
  scattering_reftau_ = - log (rng_.rnd ());

  traversed.clear();
  traversed.push_back(T_propagation_history (dust_model.zero_density(),
					     ray_.length(), T_cell_tracker()) );
      
  // Propagate ray until scatter or exit, save traversed
  // intensities, but do not add to cells directly
  hpm::push_stage(Propagate);
  while( (!propagate<true, true, false, false>()) && current_cell);
  hpm::pop_stage();

  // calculate bias factor due to optical depth of scattering
  // based on the unforced probabilities. We need to bias BEFORE
  // adding the intensities because the intensities to add depend
  // on whether the ray exited or not.
  dust_model.total_abs_length(ray_.traversed_column(), scattering_tau_);

  // the bias depends on whether the ray exited the grid or not
  if (current_cell) {
    // bias factor for rays that did not exit is exp(-tau)*tau.
    DEBUG(1,T_lambda bias(b_.bias_factor(exp(-scattering_tau_)* \
					scattering_tau_));	  \
	  if(!condition_all(bias<ray_max_i_))		  \
	    cout << "WARNING: Large bias factor  " << max(bias)<< " at lambda " << maxIndex(bias) << " biaser " << b_ << " scattering reftau " << scattering_reftau_ << " at " << __FILE__ <<':' <<__LINE__ <<endl;);
    ray_.bias(b_.bias_factor(exp(-scattering_tau_)*scattering_tau_));
  }
  else {
    // bias factor for rays that exit is exp(-tau)
    DEBUG(1,T_lambda bias(b_.bias_factor(exp(-scattering_tau_))); \
	  if(!condition_all(bias<ray_max_i_))		    \
	    cout << "WARNING: Large bias factor  " << max(bias)<< " at lambda " << maxIndex(bias) << " biaser " << b_ << " scattering reftau " << scattering_reftau_ << " at " << __FILE__ <<':' <<__LINE__ <<endl;);

    ray_.bias(b_.bias_factor(exp(-scattering_tau_)));
  }

  // add intensities now that we have biased the ray
  if(add_intensities_) {
    hpm::scoped_stage current(Intensities);
    for (typename std::vector<T_propagation_history >::const_iterator 
	   i = traversed.begin(), j = i++; i != traversed.end(); ++i, ++j) {
      assert (i != traversed.begin());
      assert (i- j == 1);
      const T_float dl = i->length() - j->length();
      add_intensity (i->cell(), dl);
    }
  }

  // This completes this ray segment. Scattering will happen in the
  // mainloop if the ray did not exit the volume and current_cell is
  // set
}



/** Pushes a ray onto the local or remote ray queue as determined by
    the action. Makes sure the ray and normalization don't keep
    referring to the xfer object members.  */
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::
push_ray(T_action action, const T_ray& r, const T_cell_tracker& c,
	 T_lambda& norm, int par) 
{
  DEBUG (1, std::ostringstream s; std::ostringstream s1;		\
	 if(thread()<0)							\
	   s << "Master task " << task();				\
	 else								\
	   s << "Worker " << task() << "-" << thread();			\
	 if(c.task()==task() || c.task() <0)				\
	   s1 << "local push";						\
	 else								\
	   s1 << "remote send to " << c.task();			\
	 printf("%s %s: action %s, param %d, max int %7.3g, depth %ld\n", s.str().c_str(), s1.str().c_str(), action_strings[action], par, max(r.intensity()),local_queue()->size()););

  assert(action != T_queue_item::invalid);
  assert(c.task()>=0 && c.task()<mpi_size());
  g.assert_in_grid(r.position());

  const T_queue_item rso(action, independent_copy(r),
			 c.code(), independent_copy(norm), par);

  if(action==T_queue_item::propagate)
    assert(rso.ray_.traversed_column()(0)==0);

  if(c.task() >=0 && c.task() != task()) {
    hpm::scoped_stage current(Ray_send);
    mpi_master_->thread_send_ray(rso, thread(), c.task());
  }
  else {
    local_pending_->push(rso);
  }
}


/** Pops a ray, including its cell tracker and action, off of the
    local_pending queue.  */
template <typename dust_model_type, typename grid_type>
mcrx::xfer<dust_model_type, grid_type>::T_action
mcrx::xfer<dust_model_type, grid_type>::pop_ray(int& r)
{
  T_queue_item rso;
  bool popped = local_pending_->try_pop(rso);
  if(!popped)
    return T_queue_item::invalid;

  assert(rso.action_!=T_queue_item::invalid);

  ray_ = rso.ray_;
  g.initialize_tracker(rso.cell_, ray_.position(), current_cell);
  norm_ = rso.norm_;
  if(rso.action_==T_queue_item::scatter)
    r=rso.param_;

  DEBUG (1, cout << "Popping ray, action " << rso.action_ << " with max intensity " << max(ray_.intensity()) << endl;);
  
  return rso.action_;
}

/** Check ray max intensity and possibly split. The action and
    parameter are set in case the ray is split. */
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::
check_ray_split(T_action a, int par)
{
  ASSERT_ALL(ray_.intensity() == ray_.intensity());

  const T_float maxi = max(ray_.intensity());
  DEBUG(3,cout << "Checking ray intensity: max I " << maxi << endl;);
  if(maxi > ray_max_i_) {
    DEBUG(1,cout << "Warning: ray intensity exceeds max: " << maxi/ray_max_i_ << endl;);
  }

  // don't split like this right now
  return;


  if ((ray_max_i_ == 0) || !current_cell) 
    // no max i set, check unnecessary
    return;

  DEBUG(3,cout << "Checking ray intensity: max I " << maxi << endl;);
  if(maxi > ray_max_i_) {
    // ray should be split
    const int n = int(ceil(maxi/ray_max_i_));
    DEBUG(1,cout << "Splitting ray with action " << a << ", parameter " << par << ", max intensity " << maxi << " into " << n << endl;);

    ray_.bias(1./n);
    for(int i=0;i<n-1;++i) {
      // we push n-1 onto the stack, because the current ray is still current
      push_ray(a, ray_, current_cell, norm_, par);
    }
  }
}


/** Emits a ray according to the emission distribution, and calculates
    the direct flux to the cameras. */
template <typename dust_model_type, typename grid_type>
void mcrx::xfer<dust_model_type, grid_type>::emit(bool push_rays, long nray_norm)
{
  hpm::scoped_stage current(Emit);

  // We call on the emission object to emit the ray and give us the
  // normalization, velocity, and angular distribution of emission. 
  T_float dummy;
  ray_distribution_pair<T_lambda> e = emission().emit(b_, rng_, dummy);
  // The ray returned in e has uninitialized column density.
  ray_ = *e.ray_pointer();
  norm_ = e.normalization();

  // if nray_norm is set, we normalize the normalization (!) of the
  // ray for that many rays. This ensures luminosity conservation on
  // several tasks.
  if(nray_norm>0)
    norm_ *= (1./nray_norm);

  /// \todo this needs to be evaluated.
  /// if the normalization is zero, we can safely set the intensity to
  /// be zero, too. this eliminates any chance of getting high bias
  /// factors at those wavelengths
  const T_float sed_dynamic_range=1e-3;
  if (sed_dynamic_range>0) {
    const T_float maxnorm = max(norm_);
    ray_.set_intensity(where(norm_/maxnorm < sed_dynamic_range,
			     norm_/maxnorm/sed_dynamic_range, 1.0));
    norm_ = where(norm_/maxnorm < sed_dynamic_range,
		  maxnorm*sed_dynamic_range, norm_);
  }
  ASSERT_ALL(ray_.intensity() >= 0);

  // If the ray is inside the grid, then we can find its location once
  // here. If not, we need to do it separately for all the camera rays
  // in emerge_from_point. Note that the position here may be in
  // another task, in which case the camera rays will be pushed
  // immediately to that task
  if(g.in_grid(ray_.position())) {
    hpm::scoped_stage current(Locate);
    current_cell = g.locate(ray_.position(), thread(), false);
  }
  else
    current_cell.reset();

  // Emerge the direct radiation getting to the cameras
  emerge_from_point (*e.distribution(), e.velocity(), push_rays);

  // now set the doppler shift of the ray based on particle velocity and ray
  // direction (can't do this before the direct radiation has been treated.)
  const T_float d = 1+dot(e.velocity(), ray_.direction())/constants::c0;
  ray_.add_doppler(d);
  DEBUG(3,std::cout << "Dopplerfactor " << d << " in emit"<< std::endl;);

  // push the ray onto the queue if we have been asked to push all
  // rays
  if(push_rays) {
    push_ray(T_queue_item::propagate, ray_, current_cell, norm_);
  }
  else if (current_cell.task()<0) {
    // if task<0, we were outside of the grid above. in that case,
    // propagate the ray to the grid boundary and locate the current
    // cell. If we are on another task, then we fall through to the
    // worker loop where it's pushed.
    propagate_external ();
  }
}


/** Propagates the ray from its position to the end of the current
    grid cell or until an interaction takes place.

    If do_scatter is false, we ignore the value of scattering_reftau
    and just keep on propagating until we hit the boundary or
    max_length. (This is used for forced scattering.)

    If keep_traversed is true, the traversed cells are added to the
    propagation_history vector, whereas if add_intensity_here is true, the intensities are
    added to the cells immediately. (Because these two flags are only used in
    a static sense, they are made template parameters so the compiler
    can do the appropriate optimization.)

    If integrate_ray is true, then the intensity of the ray will be
    changed by integrating the RTE through the cell. This will only
    give sensible results if the other options are false. 

    The return value is true if we propagated to max_length (if do_scatter
    is false), or if we scattered if it is true.
*/
template <typename dust_model_type, typename grid_type>
template <bool do_scatter, bool keep_traversed, bool add_intensity_here, 
	  bool integrate_ray>
bool
mcrx::xfer<dust_model_type, grid_type>::
propagate (T_float max_length)
{
  assert (current_cell);
  assert (max_length >= ray_.length());

  // save the current cell so we can access it.
  const T_cell_tracker cc(current_cell);

  // Find the cell boundary in the direction of travel. NOTE THAT for
  // efficiency's sake THIS UPDATES current_cell and dn_. If you
  // want the cell that we entered in, use the cc tracker object.

  // (this routine also makes sure we don't propagate past max_length,
  // resets current_cell if we hit max_length, and ensures the column
  // density returned in dn_ is appropriate in case we stop due to max
  // length)

  bool hit_max;
  const T_float dl
    (current_cell.intersection_from_within(ray_, max_length, dn_, hit_max));
  
  assert (dl > -1e-10);

  // Check if we have reached optical depth for absorption/scattering
  const T_float reftau = 
    dust_model.total_reference_abs_length(ray_.traversed_column (), b_);
  const T_float dreftau = dust_model.total_reference_abs_length(dn_, b_);
  assert (dreftau >= -1e-10);
  
  if (integrate_ray) {
    // we are doing ray integrations
    assert(!do_scatter);assert(!add_intensity_here);

    // calculate full optical depth vector through cell
    dust_model.total_abs_length(dn_, dtau_);

    i_in_ = ray_.intensity();

    // Put the source function in i_out_. This is
    // (1-exp(-tau))*epsilon/(4pi kappa). We get kappa=dtau/l, except
    // in cases where dtau_==0 this gives NaN. In those cases we know
    // that the source function is zero at those wavelengths, so we
    // just ignore it.
    i_out_ = dl/(4*constants::pi*cc.volume())*
      where(dtau_>0.0, cc.data()->get_emitter().get_emission()/dtau_, 0.0);
    
    ray_.set_intensity(i_in_*exp(-dtau_) - i_out_*expm1(-dtau_));
    ASSERT_ALL(ray_.intensity() >= 0);    

    // update ray and we are done
    ray_.propagate(dl, dn_);

    return hit_max;
  }

  // true if scattering should happen in this grid cell
  const bool scatter_now = 
    do_scatter && (scattering_reftau_ < reftau + dreftau);
  
  // 2 things can happen: scattering or just moving to the edge of the
  // grid cell.  
  if(scatter_now) {
    // Scattering is enabled and is to take place in this grid cell

    // column density fraction to scattering event
    const T_float fn = (scattering_reftau_ - reftau)/dreftau;

    // Calculate fractional distance to absorption/scattering event
    // based on fractional column density (this needs to be done by
    // the cell_tracker since the cells may not have uniform density).
    const T_float fl = 
      cc.column_to_distance_fraction(fn ,ray_.position(), ray_.direction(), dl);

    assert (fl < 1);
    // This assertion can fail if the density in the cell is so high that
    // the scattering distance underflows to 0. This usually means the density
    // in the cell is catastrophically wrong, like wrong units.
    assert (fl > 0);

    // Update ray position to pos of scattering event
    ray_.propagate (fl*dl, fn*dn_);
  }
  else {
    // Either scattering is not turned on or it's not time for
    // scattering. In both cases we just propagate to the edge of the
    // cell.

    ray_.propagate(dl, dn_);      
  }

  if (keep_traversed)
    // Update the propagation history
    traversed.push_back(T_propagation_history 
			(independent_copy(ray_.traversed_column()),
			 ray_.length(),
			 cc));
  if (add_intensity_here)
    // add intensity to the cell
    add_intensity (cc, dl);

  if (scatter_now || hit_max) {
    // we scattered in this cell, which means we are still in the cell
    // we entered. In this case we need to roll back current_cell,
    // which was automatically set to the next cell by
    // intersection_from_within
    current_cell = cc;
  }
  
  return do_scatter ? scatter_now : hit_max;
}


/** Scatters the ray.  The ray is scattered at the current position by
    calling the scatter function of the absorber in the current cell.
    After this, flux emerging to the cameras from the scattering is
    calculated.  If the ray is completely absorbed (too low intensity)
    after the scattering, current_cell is set to 0 to indicate that
    this ray is done. */
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::scatter(int resume_with)
{
  hpm::scoped_stage current(Scatter);
  assert(resume_with<0);
  assert (current_cell);
  DEBUG(2,cout << "Scattering ray. n_scat=" << ray_.scatterings() << endl;);

  T_float dummy;
  int i;
  i = 
    dust_model.draw_scatterer(current_cell.data()->get_absorber().densities(),
			      b_, ray_, rng_, dummy);


  assert (condition_all(ray_.intensity() >= 0));    

  if(immediate_reemission_) {
    assert(current_emergence_->n_cameras()==0);
    // in this mode, we draw either a scattering or an absorption
    // (i.e., effective scattering) event
    const T_float Pscat = b_.reference1(dust_model.get_scatterer(i).albedo());
    if(rng_.rnd() < Pscat) {
      // scattering. this means we bias instead of attenuate wrt the albedo
      ray_.bias(b_.bias_factor(dust_model.get_scatterer(i).albedo()));
      DEBUG(2,cout<<"Doing scattering, Pscat=" << Pscat << " max I: " << max(ray_.intensity()) << endl;);
      dust_model.get_scatterer(i).scatter(b_, ray_, rng_, dummy);
    }
    else {
      // effective scattering. this means we bias wrt 1-albedo.
      ray_.bias(b_.bias_factor(1.0-dust_model.get_scatterer(i).albedo()));
      DEBUG(2,cout<<"Doing reemission, Pscat=" << Pscat << " max I: " << max(ray_.intensity())  << endl;);
      current_cell.data()->get_emitter().effective_scatter(ray_, norm_, rng_);
    }
  }
  else {
    // attenuate ray intensity based on albedo. In debugging mode, we
    // keep track of this for energy conservation checks.
#ifdef MCRX_DEBUG_LEVEL
    const T_lambda pre_intensity(independent_copy(ray_.intensity()));
#endif
    ray_.bias(dust_model.get_scatterer(i).albedo());

    DEBUG(1,if(!condition_all(ray_.intensity()<ray_max_i_))		\
	      cout << "WARNING: Large ray intensity  " << max(ray_.intensity())<< " at lambda " << maxIndex(ray_.intensity()) << " biaser " << b_ << " at "<<__FILE__ <<':'<<__LINE__ <<endl;);
      
    // we only do this when debugging because it requires mutex
    // locking and isn't used except to test for energy conservation.
    DEBUG(1,dust_model.add_absorbed					\
	  (T_lambda((pre_intensity - ray_.intensity())*norm_)););
    DEBUG(1,current_cell.data()->get_absorber().add_absorbed	\
	  ((pre_intensity - ray_.intensity())*norm_););
      
    // At this point the ray has the intensity that is outgoing from
    // the grain, but yet unaffected by outgoing direction. This is
    // the appropriate intensity to be sent to cameras. Note that we
    // have NOT drawn a scattering direction yet, and
    // emerge_from_point restores the ray direction. Conceptually, the
    // ray has already scattered, so we manually increase the nscat
    // counter so it represents correctly the light after scattering.
    ++ray_.scatterings();
    emerge_from_point (dust_model.get_scatterer(i).phase_function(),
		       current_cell.data()->get_absorber().velocity(), false);
    // and set it back
    --ray_.scatterings();
      
    check_ray_split(T_queue_item::scatter, i);

    {
      // save the incoming direction for doppler calculation
      const vec3d indir(ray_.direction());
      
      // Now we ask the grain to scatter the ray in a direction and apply
      // biasing due to outgoing direction. We don't care about the
      // returned distribution now.
      T_float dummy;
      dust_model.get_scatterer(i).scatter(b_, ray_, rng_, dummy);
    
      // Apply doppler shift for scattering event
       const T_float d = 1.0 + dot(current_cell.data()->get_absorber().velocity(),
				   ray_.direction() - indir)/constants::c0;
       ray_.add_doppler(d);
       DEBUG(3,cout << "Dopplerfactor " << d << ", velocity " << current_cell.data()->get_absorber().velocity() << " in scatter"<< endl;);
     }
   }

   // play russian roulette to see if ray should be killed
   current_cell.reset_intersections();
   russian_roulette();

   // This completes the scattering event.
 }

 /** Plays a Russian roulette game to see if the ray should be
     killed. A true return value means the ray survived. The factor is
     used to change the threshold in relation to ray_min_i_. */
 template <typename dust_model_type, typename grid_type>
 bool
 mcrx::xfer<dust_model_type, grid_type>::russian_roulette(T_float factor)
 {
   const T_float maxi = max(ray_.intensity());

   if (maxi > ray_min_i_*factor) {
     // max intensity above RR threshold, always survives
     DEBUG(2,cout << "Ray max intensity " << maxi << ", not playing RR\n";);
     return true;
   }

   // If ray has zero intensity it doesn't matter if it passed, it's dead
   if(maxi == 0) {
     DEBUG(2,cout << "Ray intensity ==0, killing"<<endl;);
     current_cell.reset();
     return false;
   }

   const double P_rr=maxi/(ray_min_i_*factor); 
   DEBUG(2,cout << "Ray max intensity " << max(ray_.intensity()) << ",  playing Russian Roulette with p=" << P_rr << "... ";);

   if(rng_.rnd() < P_rr) {
     // survived, boost intensity by probability factor
     DEBUG(2,cout <<"Won" << endl;);
     ray_.bias(1./P_rr);
     return true;
   }

   // lost, kill it. This is done by resetting current_cell.
   DEBUG(2,cout <<"Lost" << endl;);
   current_cell.reset();
   return false;
 }


 /** Calculates the radiation that would reach the cameras from the
     current ray position as a result of emission or scattering. If
     push_camera_rays is true, it only generates the camera rays and
     immediately pushes them onto the queue. */
 template <typename dust_model_type, typename grid_type>
 void
 mcrx::xfer<dust_model_type, grid_type>::
 emerge_from_point (const angular_distribution<T_lambda>& d,
		    const vec3d& velocity, bool push_camera_rays)
 {
   // if there are no cameras, this whole function is a no-op and we
   // can return immediately
   if(current_emergence_->n_cameras()==0) return;
   // we can also immediately check that the ray is within our
   // requested number of scatterings
   if ( (ray_.scatterings()<n_scat_min_) || (ray_.scatterings()>n_scat_max_) )
     return;

   assert (condition_all(ray_.intensity() >= 0));   
   hpm::scoped_stage current(Emerge);

   // Save these so we can restore them when we are done
   const T_cell_tracker cc = current_cell;
   const vec3d original_direction = ray_.direction();
   const T_float original_doppler = ray_.doppler();
   const vec3d ray_origin = ray_.position();
   original_intensity_ = ray_.intensity();

   // go through the cameras one by one
   for(int c=0; c < current_emergence_->n_cameras(); ++c) {

     // Set up the ray to go to camera c
     ray_.set_position(ray_origin );
     ray_.set_doppler(original_doppler);
     const vec3d delta = 
       current_emergence_->get_camera(c).position () - ray_.position();
     const T_float dist = mag (delta);
     ray_.set_direction(delta/dist );
     current_cell.reset_intersections();

     // first, get the camera sensitivity function and put it in i_in_
     // as a temporary. Note that the camera distribution function uses
     // the *outgoing* direction from the camera, just like an emitter.
     current_emergence_->get_camera(c).distribution()->distribution_function
       (-ray_.direction(), vec3d(), i_in_);

     // it's possible the ray is outside of the camera field of view,
     // in which case we don't need to trace it.
     if(condition_all(i_in_==0)) {
       continue;
     }

     // then get the scattering distribution function, using
     // intensity_to_camera_ as a convenient temporary.
     d.distribution_function(ray_.direction(), 
			     original_direction,
			     intensity_to_camera_);

     // we also divide out the 1/r^2 into intensity_to_camera_ here,
     // since after we enter camera_ray we don't know the scattering
     // point any more
     intensity_to_camera_ *= 1./(dist*dist);

     // intensity_to_camera_ and i_in_ are saved until we need the intensity

     // 1.1 assign doppler
     const T_float doppler = 1.0 + dot(velocity, 
				       ray_.direction() - original_direction)/
       constants::c0;
     ray_.add_doppler(doppler);
     DEBUG(3,cout << "Dopplerfactor " << doppler << " in emerge from point"<< endl;);

     // push the ray onto the queue if we have either been asked to
     // push all rays, or if current_cell is set to another task
     if(((current_cell.task()>=0) && (current_cell.task()!=task())) ||
	push_camera_rays) {
       // here we factor intensity_to_camera_ and i_in_ into the
       // normalization when pushing. this prevents polluting the ray
       // intensity with these factors which makes it easier to RR it
       i_out_ = norm_*intensity_to_camera_*i_in_;
       push_ray(T_queue_item::emerge, ray_, current_cell, 
		i_out_, c);
     }
     else
       camera_ray(c, dist);

     current_cell = cc; 
     ray_.set_intensity(original_intensity_);
   }

   // Restore ray to the incoming state
   ray_.set_position(ray_origin );
   ray_.set_direction(original_direction);
   ray_.set_doppler(original_doppler);
   current_cell.reset_intersections();
 }


 /** Propagates the current ray to camera c and adds it. It should
     already have the correct direction, position, etc. Note that this function uses intensity_to_camera_ and i_in_ from emerge_from_point. For camera_rays coming from outside, they must be set to 1.  */
 template <typename dust_model_type, typename grid_type>
 void 
 mcrx::xfer<dust_model_type, grid_type>::
 camera_ray(int c, T_float dist)
 {
   hpm::scoped_stage current(Camera_ray);

   // if dist<0, it's not set. then we need to recalculate dist here
   // since it's not saved in the queue
   if(dist<0) {
     const vec3d delta = 
       current_emergence_->get_camera(c).position () - ray_.position();
     dist = mag(delta);
   }

   // 2.  See if the ray is outside the grid volume, and in that case,
   // if it will hit the grid volume (not going past the camera). We
   // only do this if the tracker has an invalid task. If the task is
   // valid, even if the tracker is not, then we have already
   // determined that the ray is on another task and we should fall
   // through to the push below
   if (!current_cell) {
     assert(current_cell.task()<0);
     propagate_external (dist);
   }

   // 3.  Assuming it hit the grid, propagate the ray through the
   // grid (up to max length=dist), without scattering or
   // saving/adding intensities.
   hpm::push_stage(Propagate);
   while (current_cell && !propagate<false, false, false, false> (dist));
   hpm::pop_stage();
   
   // attenuate the intensity of the ray according to the optical depth
   // so far NOW, rather than carry the optical depth. This is so we
   // can play RR on the camera rays to avoid shipping weak rays to
   // other tasks.

   // calculate the optical depth and put it in dtau_ (we put
   // this result in a temporary because total_abs_length uses an
   // explicit loop to avoid reduction overhead.
   dust_model.total_abs_length(ray_.traversed_column(), dtau_);
   ray_.reset_traversed_column();
   ray_.bias(exp(-dtau_));

   // If we come out here with an invalid tracker that points to
   // another task, push the ray. Note that this can happen because we
   // propagate out of the domain or because the cell search in
   // propagate_external returned a foreign task. If we left the
   // domain, task will be -1.
   if(current_cell.task() >=0 && current_cell.task()!=task()) {
     // we RR camera rays, too, to avoid shipping very weak rays to
     // other tasks, but with a lower threshold.
     if(russian_roulette(.1)) {
       // here we factor intensity_to_camera_ and i_in_ into the
       // normalization when pushing. this prevents polluting the ray
       // intensity with these factors which makes it easier to RR it
       i_out_ = norm_*intensity_to_camera_*i_in_;
       push_ray(T_queue_item::emerge, ray_, current_cell, i_out_, c);
     }
     ray_.reset_traversed_column();
     return;
   }

   // 4.  Add the ray to the camera

   ASSERT_ALL(intensity_to_camera_ >= 0);    
   DEBUG(2,cout << "Adding ray to camera, max i " \
	 << max(ray_.intensity()) << endl;);
   DEBUG(1, T_lambda temp(ray_.intensity()); \
	 if( max(temp) > ray_max_i_ ){ \
	   cout << "WARNING: Adding ray to camera, max intensity " \
		<< max(temp) << " at lambda " << maxIndex(temp) \
		<< " biaser " << b_ \
		<< endl;};);

   add_to_camera(current_emergence_->get_camera(c), ray_.position(), 
		 ray_.intensity()*norm_*intensity_to_camera_*i_in_,
		 ray_.doppler());
 }


 /** Propagates a ray outside the grid to the grid boundary, if it hits
     the grid, and sets current_cell. Note that current_cell may be on
     another task when coming out here. */
 template <typename dust_model_type, typename grid_type>
 void 
 mcrx::xfer<dust_model_type, grid_type>::
 propagate_external (T_float max_length)
 {
   assert (!current_cell);

   // ask the grid whether the ray will enter a valid cell
   hpm::push_stage(Locate);
   const std::pair<T_cell_tracker, T_float> tl = 
     g.intersection_from_without(ray_, thread());
   hpm::pop_stage();

   current_cell = tl.first;
   if (current_cell.task()>=0 && tl.second < max_length) {
     // the ray will enter a valid cell (but maybe on another task)
     // within max_length
     ray_.propagate(tl.second, dust_model.zero_density());
   }

   assert(ray_.length()<=max_length);
 }


 /** Emits a ray and immediately adds the radiation to the cameras
     without scattering, as if in a vacuum. This is the "budget
     version" of shoot and is used when we just want to make an image
     of the emission without doing any radiative transfer. */
 template <typename dust_model_type, typename grid_type>
 void mcrx::xfer<dust_model_type, grid_type>::shoot_isotropic (long nray_norm)
 {

   // We call on the emission object to emit the ray (the peculiar
   // maneuver is needed because we need to return the reference to the
   // angular_distribution object as well as the emitted ray)
   T_float dummy;
   ray_distribution_pair<T_lambda> e = emission().emit(b_, rng_, dummy);
   ray_ = *e.ray_pointer();
   norm_ = e.normalization();

   // if nray_norm is set, we normalize the normalization (!) of the
   // ray for that many rays. This ensures luminosity conservation on
   // several tasks.
   if(nray_norm>0)
     norm_ *= (1./nray_norm);

   // go through the cameras one by one
   for(int c=0; c < current_emergence_->n_cameras(); ++c) {

     // 1. Find the direction to the camera c 
     const vec3d delta = 
       current_emergence_->get_camera(c).position () - ray_.position();
     const T_float dist = mag (delta);
     ray_.set_direction(delta/dist );

     // 2. assign doppler
     const T_float doppler = 1.0 + dot(e.velocity(), ray_.direction())/
       constants::c0;
     ray_.set_doppler(doppler);
     DEBUG(3,cout << "Dopplerfactor " << doppler << " in shoot_isotropic"<< endl;);

     // 3.  Add the flux to the camera

     e.distribution()->distribution_function(ray_.direction(), 
					     vec3d(), 
					     intensity_to_camera_);

     current_emergence_->get_camera(c).distribution()->distribution_function
       (-ray_.direction(), vec3d(), i_in_);

     intensity_to_camera_ *=
       ray_.intensity() * norm_ * i_in_ * (1.0/(dist*dist));

     assert (condition_all(intensity_to_camera_ >= 0.));    

     add_to_camera(current_emergence_->get_camera(c), ray_.position(), 
		   intensity_to_camera_, ray_.doppler());
   }
 }


/** Add intensity to the cell. */
template <typename dust_model_type, typename grid_type>
inline void 
mcrx::xfer<dust_model_type, grid_type>::
add_intensity (const T_cell_tracker& c,
	       T_float dl)
{
  add_intensity(c, dl, ray_.intensity());
}


/** Add intensity to the cell. */
template <typename dust_model_type, typename grid_type>
template <typename T>
inline void 
mcrx::xfer<dust_model_type, grid_type>::
add_intensity (const T_cell_tracker& c,
	       T_float dl,
	       const blitz::ETBase<T>& intensity)
{
  if(!add_intensities_) 
    return;

  const T& intens=intensity.unwrap();

  assert(c);
  ASSERT_ALL(dl*norm_*intens<1e60);

  DEBUG(3,cout << "Adding intensity max: " << max(dl*norm_*intens) << endl;);

  c.data()->get_absorber().add_intensity (dl*norm_*intens);
}


/** Initializes the intensity integration by pushing all pixel
    positions onto the pixel queue. Since we don't yet know what task
    they'll end up on, we just do this on the master task. */
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::
init_ray_integration()
{
  if(task()==0)
    {
    for(int c=0; c < current_emergence_->n_cameras(); ++c) {
      const int xs=current_emergence_->get_camera(c).xsize();
       const int ys=current_emergence_->get_camera(c).ysize();
       for(int x=0; x < xs; ++x)
	 for(int y=0; y < ys; ++y)
	   px_queue_->push
	     (std::make_pair(c,blitz::TinyVector<int,2>(x, y)));
    }
  }
}


/** Determines the intensity in the specified camera pixel by
    integrating the radiative transfer equation along the ray. This
    requires that the grid be a combined absorber/emitter grid with
    computed luminosities.  The starting point of the ray is
    determined by deprojecting the specified image pixel into a
    direction vector, and then extending this to the back of the
    grid. The image position is specified in integral pixels, since
    this initialized the ray to be at level 0 of the ray quad-tree.
    The ray nscat parameter is used to encode the pixel position, and
    the queue item parameter the camera and tree level.
*/
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::
integrate_ray(const blitz::TinyVector<int, 2>& px, int c, 
	      bool emission_integration)
{
  current_cell.reset();
  const vec3d pos = current_emergence_->get_camera(c).position();
  ray_.set_position(pos);
  ray_.set_doppler(1);

  // check that pixel position fits in the scattering parameter
  assert((px[0]&0xffff0000) == 0);
  assert((px[1]&0xffff0000) == 0);
  ray_.scatterings() = (px[0]<<16) | px[1];

  // get direction from the pixel position
  ray_.set_direction(px2dir(c, ray_.scatterings(), 0));

  // Extend the ray outward to a grid boundary
  const bool in_grid = g.in_grid(pos);
  T_float len = ray_.intersect_cube(g.getmin(), g.getmax(), in_grid);

  if(len!=len)
    // the ray does not hit the grid, so intensity is zero. We don't
    // need to do anything
    return;

  // If we were in the grid, we are now at the exit boundary and we
  // are done. However, if we were outside, we are at the near
  // boundary and need to extend again to get the far one.
  if(!in_grid) {
    ray_.propagate(len);
    len = ray_.intersect_cube(g.getmin(), g.getmax(), true);
    assert(len==len);
  }

  ray_.propagate(len);
  ray_.set_direction(-ray_.direction());
  len = ray_.length();
  ray_.reset_length();
  
  // set ray intensity to zero
  ray_.set_intensity(0.0);

  // and integrate it back to the camera
  integrate_ray(c, emission_integration, len);
}

/** Returns (outgoing) direction vector corresponding to the encoded
    pixel position and level in the quadtree. */
template <typename dust_model_type, typename grid_type>
    mcrx::vec3d
mcrx::xfer<dust_model_type, grid_type>::
px2dir(int cam, int px, int level) const
{
  blitz::TinyVector<T_float, 2> pixel;
  pixel[0] = (2.0*(px >> 16)+1.0)/(1<<(level+1));
  pixel[1] = (2.0*(px & 0x0000ffff)+1.0)/(1<<(level+1));
  
  DEBUG(2,printf("Converting ray %d/%d to pixel (%f,%f)\n",px,level,pixel[0],pixel[1]););

  const vec3d dir(current_emergence_->get_camera(cam).deproject(pixel));
  return dir;
}

/** Checks if the solid angle subtended by the ray is larger than the
    solid angle subtended by the current cell, and if so splits the
    ray into 4 subrays. cam is the target camera and dist is the
    distance to it. */
template <typename dust_model_type, typename grid_type>
bool
mcrx::xfer<dust_model_type, grid_type>::
subdivide_integration_ray(int cam, T_float dist, T_float px_sr, int level, 
			  bool emission_integration)
{
  const vec3d cam_pos = current_emergence_->get_camera(cam).position();
  {
    const vec3d delta = cam_pos - ray_.position();
    const T_float dist2 = mag(delta);
    assert(almostEqual(dist, dist2, 100L));
  }

  // get solid angle subtended by the ray (or really the area at this distance)
  const T_float ray_A = px_sr*dist*dist/(1<<(2*level));

  // get area of the current cell as V^2/3
  const T_float cell_A = pow(current_cell.volume(), 2./3);
  
  if(ray_A*integrations_per_cell_ < cell_A)
    // no need to subdivide, we are done
    return false;

  // need to subdivide. extract current pixel values
  const int px = ray_.scatterings() >> 16;
  const int py = ray_.scatterings() & 0x0000ffff;
  level++;

  if((px|py)&0x8000) {
    std::cerr << "Can't subdivide ray at level " << level << " -- no more bits\n";
    return false;
  }

  // save the ray intensity (can't use member because this function is
  // recursive)
  T_lambda original_intensity(make_thread_local_copy(ray_.intensity()));

  // and run them in turn
  for(int sx=0; sx<=1; ++sx)
    for(int sy=0; sy<=1; ++sy) {
      // this resets scatterings so we need to do this first
      ray_.set_position(cam_pos);

      const int npx = (px<<1) + sx;
      const int npy = (py<<1) + sy;
      ray_.scatterings() = (npx<<16) | npy;
      ray_.set_direction(-px2dir(cam, ray_.scatterings(), level));
      //the direction is inbound so to get out to the same position we
      //propagate a negative amount
      ray_.propagate(-dist);
      ray_.reset_length();

      // the position has changed, so we don't know where the ray is
      // anymore. We reset current_cell and this will force a new
      // location
      current_cell.reset();

      // and now call the integration function on this
      integrate_ray((level<<16)|cam, emission_integration, dist);
      ray_.set_intensity(original_intensity);
    }

  return true;
}
  
      
/** Calculates the intensity of the current ray at camera c by
    directly integrating the radiation transfer equation along the
    ray. The ray should already have the correct direction, position,
    etc. to end up at the camera. This function is similar to
    camera_ray except that it integrates the source function or
    emission instead of just attenuating the rays. */
template <typename dust_model_type, typename grid_type>
void 
mcrx::xfer<dust_model_type, grid_type>::
integrate_ray(int c, bool emission_integration, T_float dist)
{
  hpm::scoped_stage current(Camera_ray);

  // decode parameter to camera number and level
  const int cam = c&0x0000ffff;
  const int level = (c>>16);
  const T_float start_len = ray_.length();
  // the solid angle subtended by the pixel we are headed to
  const T_float px_sr =
  current_emergence_->get_camera(cam).area_scale(ray_.direction())*
  current_emergence_->get_camera(cam).pixel_normalized_area();
  
  // if dist<0, it's not set. then we need to recalculate dist here
  // since it's not saved in the queue
  if(dist<0) {
    const vec3d delta = 
      current_emergence_->get_camera(cam).position () - ray_.position();
    dist = mag(delta);
  }
  
  // 2.  See if the ray is outside the grid volume, and in that case,
  // if it will hit the grid volume (not going past the camera). We
  // only do this if the tracker has an invalid task. If the task is
  // valid, even if the tracker is not, then we have already
  // determined that the ray is on another task and we should fall
  // through to the push below
  if (!current_cell) {
    assert(current_cell.task()<0);
    propagate_external (dist);
  }

  // 3.  Assuming it hit the grid, integrate the ray through the grid
  // (up to max length=dist), checking at every cell whether it's
  // small enough that we need to subdivide
  hpm::push_stage(Propagate);
  while (current_cell) {
    if(subdivide_integration_ray(cam, dist-(ray_.length()-start_len),
				 px_sr, level, emission_integration)) {
      // the ray was subdivided, so the sub-rays have already been
      // completed. We are done.
      return;
    }
    if(emission_integration) {
      // this is the relevant part extracted from the propagate function

      const T_float vol = current_cell.volume();
      const typename T_cell::T_data* data(current_cell.data());
      bool hit_max;
      const T_float dl
	(current_cell.intersection_from_within(ray_, blitz::huge(T_float()),
					       dn_, hit_max));
      assert(dl>=0);
      i_out_ = (dl/vol)*data->get_emitter().get_emission();
      ray_.set_intensity(ray_.intensity() + i_out_);
      
      ASSERT_ALL(ray_.intensity() >= 0);    
      ray_.propagate(dl);
    }
    else {
      // because propagate does NOT reset current_cell if we hit max
      // dist, we need to do that manually here
      if(propagate<false, false, false, true> (dist))
	current_cell.reset();
    }
  }

  hpm::pop_stage();

  // If we come out here with an invalid tracker that points to
  // another task, push the ray. Note that this can happen because we
  // propagate out of the domain or because the cell search in
  // propagate_external returned a foreign task. If we left the
  // domain, task will be -1.
  if(current_cell.task() >=0 ) {
    push_ray(emission_integration ?
	     T_queue_item::emission : T_queue_item::integrate,
	     ray_, current_cell, norm_, c);
    ray_.reset_traversed_column();
    current_cell.reset();
    return;
  }

  // 4.  We have integrate all the way. Now we can add the intensity
  // to the camera. Since we are integrating the RTE, we already have
  // the correct solid angle dependence, there is no additional factor
  // to get surface brightness.

  ASSERT_ALL(ray_.intensity() >= 0);    
  DEBUG(2,cout << "Adding ray to camera, max i " \
	<< max(ray_.intensity()) << endl;);

  // when we add, we have to take into account the level of the ray to
  // give it appropriate weight
  add_to_camera(current_emergence_->get_camera(cam), ray_.position(), 
		ray_.intensity()*(1.0/(1<<(2*level))), ray_.doppler());
}


/** Initializes the supplied item. This is used by the mpi master to
    determine the structure of the message data. Because the ray_ and
    norm_ members are initialized in the constructor, we don't have to
    do anything special. */
template <typename dust_model_type, typename grid_type>
void
mcrx::xfer<dust_model_type, grid_type>::
init_queue_item(T_queue_item& item)
{
  const T_queue_item rso(T_queue_item::invalid, independent_copy(ray_),
			 current_cell.code(), independent_copy(norm_));
  item = rso;
}


/** Returns the column density obtained by integrating from the
    specified position in the specified direction. \todo This function
    is not MPI-safe. */
template <typename dust_model_type, typename grid_type>
mcrx::T_densities
mcrx::xfer<dust_model_type, grid_type>::
integrate_column_density(const vec3d& pos, const vec3d& dir)
{
  current_cell.reset();
  ray_.set_position(pos);
  ray_.set_direction(dir);

  propagate_external();

  while(current_cell)
    propagate<false, false, false, false>();

  return ray_.traversed_column();
}


#endif


